package com.zyx.pulsardemo.basic.readers;

import com.zyx.pulsardemo.basic.base.PulsarDemoBase;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;


/**
 * @author Yaxi.Zhang
 * @since 2021/9/30 18:02
 * desc: PulsarReader基类
 * reference: https://github.com/david-streamlio/pulsar-in-action
 */
public abstract class PulsarReaderDemoBase extends PulsarDemoBase {

    protected void startReader() throws PulsarClientException {
        final Reader<byte[]> reader = getReader();
        Runnable run = () -> {
            try {
                do {
                    Message<byte[]> msg = reader.readNext();
                    System.out.printf("Message read: %s \n", new String(msg.getData()));
                } while (!reader.hasReachedEndOfTopic());
            } catch (Exception ex) {
		  		ex.printStackTrace();
            }
        };
        new Thread(run).start();
    }

    protected abstract Reader<byte[]> getReader();

}
